Skip to content

Parquet: add adaptive bloom filter sizing (PARQUET-2326)#16363

Open
raghav-reglobe wants to merge 1 commit into
apache:mainfrom
raghav-reglobe:parquet-adaptive-bloom-filter
Open

Parquet: add adaptive bloom filter sizing (PARQUET-2326)#16363
raghav-reglobe wants to merge 1 commit into
apache:mainfrom
raghav-reglobe:parquet-adaptive-bloom-filter

Conversation

@raghav-reglobe
Copy link
Copy Markdown

What changes are proposed in this pull request?

Add a new table property to enable parquet-mr's adaptive bloom filter sizing
(PARQUET-2326) for Iceberg-managed Parquet writes:

  • write.parquet.bloom-filter-adaptive-enabled (boolean, default false)

When enabled, parquet-mr's ColumnValueCollector.initBloomFilter() constructs an
AdaptiveBlockSplitBloomFilter instead of BlockSplitBloomFilter. The adaptive variant evaluates
N candidate filter sizes and picks the smallest that satisfies actual NDV at the configured FPP,
instead of always pre-allocating bloom-filter-max-bytes.

Why are the changes needed?

Today, when bloom filter is enabled on a column without per-column NDV, parquet-mr's
ColumnValueCollector.initBloomFilter() allocates a fixed bloom-filter-max-bytes buffer:

// from ColumnValueCollector
} else {
  this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, maxBloomFilterSize);
}

The buffer is then written to disk via ParquetFileWriter.serializeBloomFilters() regardless of
how many values were inserted. For low-row-count writes this produces a file dominated by an
empty bloom filter.

Empirical observations from a Spark Structured Streaming + Iceberg pipeline (~720 silver
tables on warm 600s trigger):

TBLPROPERTIES File size for 5-row write
no bloom 902 bytes
bloom-enabled.col.id=true, max-bytes=4194304 4,201,826 bytes (~4 MiB)
+ adaptive (this PR) 268,465 bytes (~16x reduction)

For workloads that produce frequent low-row-count microbatches (CDC streaming, frequent commits),
this is a significant storage and S3 PUT cost reduction.

How was this patch tested?

  • New unit test TestParquetAdaptiveBloomFilter covers both:
    • Adaptive enabled — verifies file is at least 2x smaller than non-adaptive
    • Default behavior — verifies existing (non-adaptive) behavior is preserved when
      the property is not set
  • Empirically verified on a production Spark Structured Streaming + Iceberg pipeline.
    File sizes dropped 4 MiB → ~268 KiB on streaming microbatch outputs.

Backward compatibility

Default value is false, so existing tables and writers see no behavior change.
Operators opt in by setting write.parquet.bloom-filter-adaptive-enabled=true.

Scope

This PR modifies the createWriterFunc code path (used by Spark, Flink, and other engines for
data writes). The legacy ParquetWriteBuilder fallback path (createWriterFunc == null) is
unchanged. If maintainers want adaptive support on the legacy path as well, happy to extend in
a follow-up.

Files changed

  • core/src/main/java/org/apache/iceberg/TableProperties.java — add 1 constant + default
  • parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java — wire the property through
    Context + use in WriteBuilder
  • parquet/src/test/java/org/apache/iceberg/parquet/TestParquetAdaptiveBloomFilter.java — new test

Add a new table property to enable parquet-mr's adaptive bloom filter
sizing for Iceberg-managed Parquet writes:

  write.parquet.bloom-filter-adaptive-enabled (boolean, default false)

When enabled, parquet-mr's `ColumnValueCollector.initBloomFilter`
constructs an `AdaptiveBlockSplitBloomFilter` instead of
`BlockSplitBloomFilter`. The adaptive variant evaluates N candidate
filter sizes and picks the smallest that satisfies actual NDV at the
configured FPP, instead of pre-allocating `bloom-filter-max-bytes`.

Why: when bloom filter is enabled on a column without per-column NDV,
parquet-mr's writer allocates a fixed `bloom-filter-max-bytes` buffer
per column and writes it to disk regardless of how many values were
inserted. For low-row-count writes this produces a file dominated by
an empty bloom filter buffer.

Empirical reduction on a Spark Structured Streaming + Iceberg
pipeline: 5-row write with `bloom-filter-max-bytes=4194304` shrinks
from 4,201,826 bytes to ~268,465 bytes (16x).

Defaults to false to preserve current behavior. Operators opt in by
setting `write.parquet.bloom-filter-adaptive-enabled=true`.

Modifies the `createWriterFunc` write path (used by Spark/Flink data
writes). The legacy `ParquetWriteBuilder` fallback path is unchanged.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant